{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "9f1a36d1-2050-4a92-8308-2c02f725f88b",
   "metadata": {
    "tags": []
   },
   "source": [
    "# Build Basic RAG App\n",
    "\n",
    "This tutorial demonstrates a complete Retrieval-Augmented Generation (RAG) system using Ray ecosystem components for scalable AI workflows.\n",
    "\n",
    "Since we have already built the data ingestion pipeline in notebook #2, we'll just show how to buid the user query pipeline for RAG.\n",
    "\n",
    "It showcases embedding generation, vector search, context-aware prompting, and streaming response delivery—all in a single executable pipeline.\n",
    "\n",
    "Here is the architecture diagram:\n",
    "\n",
    "<img src=\"https://raw.githubusercontent.com/ray-project/ray/refs/heads/master/doc/source/ray-overview/examples/e2e-rag/images/rag-user-query-pipeline.png\" width=800>"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "68e9563e",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert-warning\">\n",
    "  <b>Anyscale-Specific Configuration</b>\n",
    "  \n",
    "  <p>Note: This tutorial is optimized for the Anyscale platform. When running on open source Ray, additional configuration is required. For example, you’ll need to manually:</p>\n",
    "  \n",
    "  <ul>\n",
    "    <li>\n",
    "      <b>Configure your Ray Cluster:</b> Set up your multi-node environment (including head and worker nodes) and manage resource allocation (e.g., autoscaling, GPU/CPU assignments) without the Anyscale automation. See the Ray Cluster Setup documentation for details: <a href=\"https://docs.ray.io/en/latest/cluster/getting-started.html\">https://docs.ray.io/en/latest/cluster/getting-started.html</a>.\n",
    "    </li>\n",
    "    <li>\n",
    "      <b>Manage Dependencies:</b> Install and manage dependencies on each node since you won’t have Anyscale’s Docker-based dependency management. Refer to the Ray Installation Guide for instructions on installing and updating Ray in your environment: <a href=\"https://docs.ray.io/en/latest/ray-core/handling-dependencies.html\">https://docs.ray.io/en/latest/ray-core/handling-dependencies.html</a>.\n",
    "    </li>\n",
    "    <li>\n",
    "      <b>Set Up Storage:</b> Configure your own distributed or shared storage system (instead of relying on Anyscale’s integrated cluster storage). Check out the Ray Cluster Configuration guide for suggestions on setting up shared storage solutions: <a href=\"https://docs.ray.io/en/latest/train/user-guides/persistent-storage.html\">https://docs.ray.io/en/latest/train/user-guides/persistent-storage.html</a>.\n",
    "    </li>\n",
    "  </ul>\n",
    "\n",
    "</div>\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1d941fb3",
   "metadata": {},
   "source": [
    "## Prerequisites\n",
    "\n",
    "Before you move on to the next steps, please make sure you have all the required prerequisites in place."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d3cf400f",
   "metadata": {},
   "source": [
    "\n",
    "<div class=\"alert alert-block alert-warning\"> <b> Pre-requisite #1: You must have finished the data ingestion in Chroma DB with CHROMA_PATH = \"/mnt/cluster_storage/vector_store\" and CHROMA_COLLECTION_NAME = \"anyscale_jobs_docs_embeddings\". For setup details, please refer to Notebook #2.</b> \n",
    "</div>"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f3466b98",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert-warning\"> <b> Pre-requisite #2: You must have deployed the LLM service with `Qwen/Qwen2.5-32B-Instruct` model. For setup details, please refer to Notebook #3.</b> \n",
    "</div>"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a92ea911",
   "metadata": {},
   "source": [
    "## Verify the LLM service\n",
    "\n",
    "First, let's first verify the LLM service is available"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "8df0687c",
   "metadata": {
    "tags": []
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Model response:\n",
      "Anyscale Jobs likely refers to job opportunities or roles within the context of Anyscale, a company that specializes in scalable computing solutions. Anyscale is known for developing Ray, an open-source framework for building distributed applications. The company focuses on making it easier for developers and researchers to scale their applications and machine learning models across multiple machines.\n",
      "\n",
      "If you're looking for information on specific job openings at Anyscale, you would typically find these on the company's official website under a \"Careers\" or \"Jobs\" section, or on popular job listing platforms. Positions might include roles in software engineering, machine learning, data science, and other technical fields, given the company's focus on scalable computing and distributed systems.\n"
     ]
    }
   ],
   "source": [
    "from rag_utils import LLMClient\n",
    "\n",
    "# Initialize client\n",
    "model_id='Qwen/Qwen2.5-32B-Instruct' ## model id need to be same as your deployment \n",
    "base_url = \"https://llm-service-qwen-32b-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com/\" ## replace with your own service base url\n",
    "api_key = \"a1ndpMKaXi76sTIfr_afmx8HynFA1fg-TGaZ2gUuDG0\" ## replace with your own api key\n",
    "\n",
    "client = LLMClient(base_url=base_url, api_key=api_key, model_id=model_id)\n",
    "\n",
    "prompt = \"what is anyscale jobs\"\n",
    "print(\"Model response:\")\n",
    "for token in client.get_response_streaming(prompt, temperature=0):\n",
    "    print(token, end=\"\")\n",
    "print()  # For newline after the streamed response"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c5ceb2b6",
   "metadata": {},
   "source": [
    "## Observation and Why we need RAG?\n",
    "\n",
    "The response refers to job openings at Anyscale instead of explaining what \"Anyscale jobs\" are in the context of the platform: https://docs.anyscale.com/platform/jobs/\n",
    "\n",
    "This demonstrates why RAG is needed—by integrating domain-specific context, we help the LLM produce more accurate answers."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0665e6f3",
   "metadata": {},
   "source": [
    "## Build the embedder\n",
    "\n",
    "Similar to previous tutorials, we use the `SentenceTransformer` library to convert text strings into numerical embeddings. The embedder automatically utilizes a CUDA-enabled GPU if available, making it efficient for both single and batch processing."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "87ebe714",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from typing import Dict, List, Union\n",
    "import torch\n",
    "import numpy as np\n",
    "from sentence_transformers import SentenceTransformer\n",
    "\n",
    "class Embedder:\n",
    "    def __init__(self, model_name: str = \"intfloat/multilingual-e5-large-instruct\"):\n",
    "        self.model_name = model_name\n",
    "        self.model = SentenceTransformer(\n",
    "            self.model_name,\n",
    "            device=\"cuda\" if torch.cuda.is_available() else \"cpu\"\n",
    "        )\n",
    "    \n",
    "    def embed_single(self, text: str) -> np.ndarray:\n",
    "        \"\"\"Generate an embedding for a single text string.\"\"\"\n",
    "        return self.model.encode(text, convert_to_numpy=True)\n",
    "    \n",
    "    def embed_batch(self, texts: List[str]) -> np.ndarray:\n",
    "        \"\"\"Generate embeddings for a batch (list) of text strings.\"\"\"\n",
    "        return self.model.encode(texts, convert_to_numpy=True)\n",
    "    "
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6aa08a0b",
   "metadata": {},
   "source": [
    "## Query the Chroma DB\n",
    "\n",
    "Similar to `ChromaWrite` class in previous tutotials, we define a `ChromaQuerier` class that acts as an interface to a Chroma vector store, enabling efficient retrieval of document chunks based on similarity to a provided query embedding. \n",
    "\n",
    "It processes raw results by reformatting and filtering them according to a defined score threshold, ensuring that only the most relevant information is returned. As part of a Retrieval-Augmented Generation (RAG) workflow, this setup helps integrate precise, contextually significant data into subsequent generation steps.\n",
    "\n",
    "The `score_threshold` parameter sets the minimum acceptable similarity score for a result to be considered relevant. In this code, each result's score is calculated as 1 minus its distance, meaning that lower distances (indicating higher similarity) result in higher scores. By filtering out any results with scores below the score_threshold (defaulted to 0.8), the code ensures that only the most contextually relevant documents are returned.\n",
    "\n",
    "We also implement two special method ,  `__getstate__` and `__setstate__`, which are  special hooks in Python's pickling protocol :\n",
    "* `__getstate__`: Prepares the object for pickling by removing attributes that can’t be serialized, ensuring that only the essential state is saved.\n",
    "* `__setstate__`: Rebuilds the object after unpickling by restoring its state and reinitializing the unpickleable components so that the object remains fully functional.\n",
    "\n",
    "These two functions will prevent the error such as `TypeError: cannot pickle 'weakref.ReferenceType' object` when you use `map_batches` during batching processing with Ray data. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "96c402d4",
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from pprint import pprint\n",
    "import chromadb\n",
    "\n",
    "\n",
    "\n",
    "class ChromaQuerier:\n",
    "    \"\"\"\n",
    "    A class to query a Chroma database collection and return formatted search results.\n",
    "    \"\"\"\n",
    "    def __init__(\n",
    "        self, \n",
    "        chroma_path: str, \n",
    "        chroma_collection_name: str,\n",
    "        score_threshold: float = 0.8  # Define a default threshold value if needed.\n",
    "    ):\n",
    "        \"\"\"\n",
    "        Initialize the ChromaQuerier with the specified Chroma DB settings and score threshold.\n",
    "        \"\"\"\n",
    "        self.chroma_path = chroma_path\n",
    "        self.chroma_collection_name = chroma_collection_name\n",
    "        self.score_threshold = score_threshold\n",
    "\n",
    "        # Initialize the persistent client and collection.\n",
    "        self._init_chroma_client()\n",
    "\n",
    "    def _init_chroma_client(self):\n",
    "        \"\"\"\n",
    "        Initialize or reinitialize the Chroma client and collection.\n",
    "        \"\"\"\n",
    "        self.chroma_client = chromadb.PersistentClient(path=self.chroma_path)\n",
    "        self.collection = self.chroma_client.get_or_create_collection(name=self.chroma_collection_name)\n",
    "\n",
    "    def __getstate__(self):\n",
    "        \"\"\"\n",
    "        Customize pickling by excluding the unpickleable Chroma client and collection.\n",
    "        \"\"\"\n",
    "        state = self.__dict__.copy()\n",
    "        state.pop(\"chroma_client\", None)\n",
    "        state.pop(\"collection\", None)\n",
    "        return state\n",
    "\n",
    "    def __setstate__(self, state):\n",
    "        \"\"\"\n",
    "        Restore the state and reinitialize the Chroma client and collection.\n",
    "        \"\"\"\n",
    "        self.__dict__.update(state)\n",
    "        self._init_chroma_client()\n",
    "\n",
    "    def _reformat(self, chroma_results: dict) -> list:\n",
    "        \"\"\"\n",
    "        Reformat Chroma DB results into a flat list of dictionaries.\n",
    "        \"\"\"\n",
    "        reformatted = []\n",
    "        metadatas = chroma_results.get(\"metadatas\", [])\n",
    "        documents = chroma_results.get(\"documents\", [])\n",
    "        distances = chroma_results.get(\"distances\", [])\n",
    "        \n",
    "        chunk_index = 1\n",
    "        for meta_group, doc_group, distance_group in zip(metadatas, documents, distances):\n",
    "            for meta, text, distance in zip(meta_group, doc_group, distance_group):\n",
    "                entry = {\n",
    "                    \"chunk_index\": chunk_index,\n",
    "                    \"chunk_id\": meta.get(\"chunk_id\"),\n",
    "                    \"doc_id\": meta.get(\"doc_id\"),\n",
    "                    \"page_number\": meta.get(\"page_number\"),\n",
    "                    \"source\": meta.get(\"source\"),\n",
    "                    \"text\": text,\n",
    "                    \"distance\": distance,\n",
    "                    \"score\": 1 - distance\n",
    "                }\n",
    "                reformatted.append(entry)\n",
    "                chunk_index += 1\n",
    "        \n",
    "        return reformatted\n",
    "\n",
    "    def _reformat_batch(self, chroma_results: dict) -> list:\n",
    "        \"\"\"\n",
    "        Reformat batch Chroma DB results into a list where each element corresponds\n",
    "        to a list of dictionaries for each query embedding.\n",
    "        \"\"\"\n",
    "        batch_results = []\n",
    "        metadatas = chroma_results.get(\"metadatas\", [])\n",
    "        documents = chroma_results.get(\"documents\", [])\n",
    "        distances = chroma_results.get(\"distances\", [])\n",
    "        \n",
    "        for meta_group, doc_group, distance_group in zip(metadatas, documents, distances):\n",
    "            formatted_results = []\n",
    "            chunk_index = 1  # Reset index for each query result.\n",
    "            for meta, text, distance in zip(meta_group, doc_group, distance_group):\n",
    "                entry = {\n",
    "                    \"chunk_index\": chunk_index,\n",
    "                    \"chunk_id\": meta.get(\"chunk_id\"),\n",
    "                    \"doc_id\": meta.get(\"doc_id\"),\n",
    "                    \"page_number\": meta.get(\"page_number\"),\n",
    "                    \"source\": meta.get(\"source\"),\n",
    "                    \"text\": text,\n",
    "                    \"distance\": distance,\n",
    "                    \"score\": 1 - distance\n",
    "                }\n",
    "                formatted_results.append(entry)\n",
    "                chunk_index += 1\n",
    "            batch_results.append(formatted_results)\n",
    "        \n",
    "        return batch_results\n",
    "\n",
    "    def _filter_by_score(self, results: list) -> list:\n",
    "        \"\"\"\n",
    "        Filter out results with a score lower than the specified threshold.\n",
    "        \"\"\"\n",
    "        return [result for result in results if result[\"score\"] >= self.score_threshold]\n",
    "\n",
    "    def query(self, query_embedding, n_results: int = 3) -> list:\n",
    "        \"\"\"\n",
    "        Query the Chroma collection for the top similar documents based on the provided embedding.\n",
    "        The results are filtered based on the score threshold.\n",
    "\n",
    "        Parameters:\n",
    "            query_embedding (list or np.ndarray): The input embedding vector.\n",
    "            n_results (int): Number of top similar results to return.\n",
    "\n",
    "        Returns:\n",
    "            list: A list of formatted and filtered search result dictionaries.\n",
    "        \"\"\"\n",
    "        # Convert numpy array to list if necessary.\n",
    "        if isinstance(query_embedding, np.ndarray):\n",
    "            query_embedding = query_embedding.tolist()\n",
    "\n",
    "        results = self.collection.query(\n",
    "            query_embeddings=query_embedding,\n",
    "            n_results=n_results,\n",
    "            include=[\"documents\", \"metadatas\", \"distances\"]\n",
    "        )\n",
    "        \n",
    "        formatted_results = self._reformat(results)\n",
    "        filtered_results = self._filter_by_score(formatted_results)\n",
    "        return filtered_results\n",
    "\n",
    "    def query_batch(self, query_embeddings, n_results: int = 3) -> list:\n",
    "        \"\"\"\n",
    "        Query the Chroma collection for the top similar documents for a batch of embeddings.\n",
    "        Each query embedding in the input list returns its own set of results, filtered based on the score threshold.\n",
    "\n",
    "        Parameters:\n",
    "            query_embeddings (list): A list of embeddings (each as a list or np.ndarray).\n",
    "            n_results (int): Number of top similar results to return for each query embedding.\n",
    "\n",
    "        Returns:\n",
    "            list: A list where each element is a list of formatted and filtered search result dictionaries \n",
    "                  for the corresponding query embedding.\n",
    "        \"\"\"\n",
    "        # Process each embedding: if any is a numpy array, convert it to list.\n",
    "        processed_embeddings = [\n",
    "            emb.tolist() if isinstance(emb, np.ndarray) else emb\n",
    "            for emb in query_embeddings\n",
    "        ]\n",
    "        \n",
    "        # Query the collection with the batch of embeddings.\n",
    "        results = self.collection.query(\n",
    "            query_embeddings=processed_embeddings,\n",
    "            n_results=n_results,\n",
    "            include=[\"documents\", \"metadatas\", \"distances\"]\n",
    "        )\n",
    "        \n",
    "        # Reformat the results into batches.\n",
    "        batch_results = self._reformat_batch(results)\n",
    "        \n",
    "        # Filter each query's results based on the score threshold.\n",
    "        filtered_batch = [self._filter_by_score(results) for results in batch_results]\n",
    "        \n",
    "        return filtered_batch\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "a10cfc82",
   "metadata": {},
   "source": [
    "## Render the basic RAG prompt\n",
    "\n",
    "Create a prompt that includes the retrieved context and the user’s question. This prompt guides the LLM to produce an answer that is informed by the context.\n",
    "\n",
    "We are using a bisc prompt from langchain's rag bot: https://python.langchain.com/docs/tutorials/rag/\n",
    "\n",
    "**Note:**\n",
    "The quality of the prompt is not optimal; in our next tutorial on prompt engineering, we will demonstrate how to improve it.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "4a89d88f",
   "metadata": {},
   "outputs": [],
   "source": [
    "\n",
    "def render_basic_rag_prompt(user_request, context):\n",
    "    prompt = f\"\"\"Use the following pieces of context to answer the question at the end.\n",
    "If you don't know the answer, just say that you don't know, don't try to make up an answer.\n",
    "Use three sentences maximum and keep the answer as concise as possible.\n",
    "Always say \"thanks for asking!\" at the end of the answer.\n",
    "\n",
    "{context}\n",
    "\n",
    "Question: {user_request}\n",
    "\n",
    "Helpful Answer:\"\"\"\n",
    "    return prompt.strip()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "71d35a69",
   "metadata": {},
   "source": [
    "## Connect the Components of RAG\n",
    "\n",
    "Integrate the embedder, Chroma querier, and LLM service into a complete RAG pipeline.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "face0a76",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Query Results:\n",
      "[{'chunk_id': '55b8db89-0aa6-460c-ae3a-284241f5865c',\n",
      "  'chunk_index': 1,\n",
      "  'distance': 0.10053980350494385,\n",
      "  'doc_id': '7b170e3d-4081-4527-a737-1da022a364c4',\n",
      "  'page_number': 1,\n",
      "  'score': 0.8994601964950562,\n",
      "  'source': 'anyscale-rag-application/100-docs/Jobs.txt',\n",
      "  'text': '2/12/25, 9:48 AM Jobs | Anyscale Docs Jobs Run discrete workloads '\n",
      "          'in production such as batch inference, bulk embeddings generation, '\n",
      "          'or model fine-tuning. Anyscale Jobs allow you to submit '\n",
      "          'applications developed on workspaces to a standalone Ray cluster '\n",
      "          'for execution. Built for production and designed to fit into your '\n",
      "          'CI/CD pipeline, jobs ensure scalable and reliable performance. How '\n",
      "          'does it work? # When you’re ready to promote an app to production, '\n",
      "          'submit a job from the workspace using anyscale job submit . '\n",
      "          'Anyscale Jobs have the following features: Scalability: Rapid '\n",
      "          'scaling to thousands of cloud instances, adjusting computing '\n",
      "          'resources to match application demand. Fault tolerance: Retries for '\n",
      "          'failures and automatic rescheduling to an alternative cluster for '\n",
      "          'unexpected failures like running out of memory. Monitoring and '\n",
      "          'observability: Persistent dashboards that allow you to observe '\n",
      "          'tasks in real time and email alerts upon successf ul job '\n",
      "          'completion. Get started 1. Sign in or sign up for an account. 2. '\n",
      "          'Select the Intro to Jobs example. 3. Select Launch. This example '\n",
      "          'runs in a Workspace. See Workspaces for background information. 4. '\n",
      "          'Follow the notebook or view it in the docs. 5. Terminate the '\n",
      "          \"Workspace when you're done. Ask AI \"\n",
      "          'https://docs.anyscale.com/platform/jobs/ 1/2 2/12/25, 9:48 AM Jobs '\n",
      "          '| Anyscale Docs https://docs.anyscale.com/platform/jobs/ 2/2'},\n",
      " {'chunk_id': 'fdd75966-04d9-43d2-8e3e-b85e8387d086',\n",
      "  'chunk_index': 2,\n",
      "  'distance': 0.11302393674850464,\n",
      "  'doc_id': '82efb8cd-2181-47cb-9389-ff101cc68674',\n",
      "  'page_number': 2,\n",
      "  'score': 0.8869760632514954,\n",
      "  'source': 'anyscale-rag-application/100-docs/Create_and_manage_jobs.pdf',\n",
      "  'text': '2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs Defining a '\n",
      "          'job With the CLI, you can define jobs in a YAML file and submit '\n",
      "          'them by referencing the YAML: anyscale job submit --config-file '\n",
      "          'config.yaml For an example of defining a job in a YAML, see the '\n",
      "          'reference docs. Waiting on a job You can block CLI and SDK commands '\n",
      "          'until a job enters a specified state. By default, '\n",
      "          'JobState.SUCCEEDED is used. See all available states in the '\n",
      "          'reference docs. CLI Python SDK anyscale job wait -n job-wait When '\n",
      "          'you submit a job, you can specify --wait , which waits for the job '\n",
      "          'to succeed or exits if the job fails. anyscale job submit -n '\n",
      "          'job-wait --wait -- sleep 30 For more information on submitting jobs '\n",
      "          'with the CLI, see the reference docs. Terminating a job You can '\n",
      "          'terminate a job from the Job page or using the CLI/SDK: CLI Python '\n",
      "          'SDK https://docs.anyscale.com/platform/jobs/manage-jobs 2/5'},\n",
      " {'chunk_id': '23bf61db-155a-4a1a-a74f-669409c92303',\n",
      "  'chunk_index': 3,\n",
      "  'distance': 0.11444741487503052,\n",
      "  'doc_id': '91ddd49b-9e23-4e58-9143-bbac61ee2157',\n",
      "  'page_number': 1,\n",
      "  'score': 0.8855525851249695,\n",
      "  'source': 'anyscale-rag-application/100-docs/Job_schedules.html',\n",
      "  'text': 'anyscale.job.terminate(name=\"my-job\") For more information on '\n",
      "          'terminating jobs with the SDK, see the reference docs. Archiving a '\n",
      "          'job\\u200b Archiving jobs hide them from the job list page, but you '\n",
      "          'can still access them through the CLI and SDK. The cluster '\n",
      "          'associated with an archived job is archived automatically. To be '\n",
      "          'archived, jobs must be in a terminal state. You must have created '\n",
      "          'the job or be an organization admin to archive the job. You can '\n",
      "          'archive jobs in Anyscale console or through the CLI/SDK: CLI Python '\n",
      "          \"SDK anyscale job archive --id 'prodjob_...' For more information on \"\n",
      "          'archiving jobs with the CLI, see the reference docs. import '\n",
      "          'anyscale\\n'\n",
      "          '\\n'\n",
      "          'anyscale.job.archive(name=\"my-job\") For more information on '\n",
      "          'archiving jobs with the SDK, see the reference docs. Managing '\n",
      "          'dependencies\\u200b When developing Anyscale jobs, you may need to '\n",
      "          'include additional Python packages or system-level dependencies. '\n",
      "          'There are several ways to manage these dependencies: Using a '\n",
      "          'requirements.txt file\\u200b The simplest way to manage Python '\n",
      "          'package dependencies is by using a requirements.txt file. Create a '\n",
      "          'requirements.txt file in your project directory: emoji==2.12.1\\n'\n",
      "          'numpy==1.21.0 When submitting your job, include the -r or '\n",
      "          '--requirements flag: CLI Python SDK anyscale job submit '\n",
      "          '--config-file job.yaml -r ./requirements.txt import anyscale\\n'\n",
      "          'from anyscale.job.models import JobConfig\\n'\n",
      "          '\\n'\n",
      "          'config = JobConfig(\\n'\n",
      "          '    name=\"my-job\",\\n'\n",
      "          '    entrypoint=\"python main.py\",\\n'\n",
      "          '    working_dir=\".\",\\n'\n",
      "          '    requirements=\"./requirements.txt\"\\n'\n",
      "          ')\\n'\n",
      "          '\\n'\n",
      "          'anyscale.job.submit(config) This method works well for '\n",
      "          'straightforward Python package dependencies. Anyscale installs '\n",
      "          \"these packages in the job's environment before running your code. \"\n",
      "          'Using a custom container\\u200b For more complex dependency '\n",
      "          'management, including system-level packages or specific environment '\n",
      "          'configurations, use a custom container: Create a Dockerfile: FROM '\n",
      "          'anyscale/ray:2.10.0-py310\\n'\n",
      "          '\\n'\n",
      "          '# Install system dependencies if needed\\n'\n",
      "          'RUN apt-get update && apt-get install -y <your-system-packages>'},\n",
      " {'chunk_id': '13683649-bcdc-444d-866f-36cafcb132c9',\n",
      "  'chunk_index': 4,\n",
      "  'distance': 0.11574643850326538,\n",
      "  'doc_id': '91ddd49b-9e23-4e58-9143-bbac61ee2157',\n",
      "  'page_number': 1,\n",
      "  'score': 0.8842535614967346,\n",
      "  'source': 'anyscale-rag-application/100-docs/Job_schedules.html',\n",
      "  'text': 'Create and manage jobs Submitting a job\\u200b To submit your job to '\n",
      "          'Anyscale, use the Python SDK or CLI and pass in any additional '\n",
      "          'options or configurations for the job. By default, Anyscale uses '\n",
      "          'your workspace or cloud to provision a cluster to run your job. You '\n",
      "          'can define a custom cluster through a compute config or specify an '\n",
      "          'existing cluster. Once submitted, Anyscale runs the job as '\n",
      "          'specified in the entrypoint command, which is typically a Ray Job. '\n",
      "          \"If the run doesn't succeed, the job restarts using the same \"\n",
      "          'entrypoint up to the number of max_retries. CLI Python SDK anyscale '\n",
      "          'job submit --name=my-job \\\\\\n'\n",
      "          '  --working-dir=. --max-retries=5 \\\\\\n'\n",
      "          '  --image-uri=\"anyscale/image/IMAGE_NAME:VERSION\" \\\\\\n'\n",
      "          '  --compute-config=COMPUTE_CONFIG_NAME \\\\\\n'\n",
      "          '  -- python main.py With the CLI, you can either specify an '\n",
      "          'existing compute config with --compute-config=COMPUTE_CONFIG_NAME '\n",
      "          'or define a new one in a job YAML. For more information on '\n",
      "          'submitting jobs with the CLI, see the reference docs. import '\n",
      "          'anyscale\\n'\n",
      "          'from anyscale.job.models import JobConfig\\n'\n",
      "          '\\n'\n",
      "          'config = JobConfig(\\n'\n",
      "          '  name=\"my-job\",\\n'\n",
      "          '  entrypoint=\"python main.py\",\\n'\n",
      "          '  working_dir=\".\",\\n'\n",
      "          '  max_retries=5,\\n'\n",
      "          '  image_uri=\"anyscale/image/IMAGE_NAME:VERSION\",\\n'\n",
      "          '  compute_config=\"COMPUTE_CONFIG_NAME\"\\n'\n",
      "          ')'},\n",
      " {'chunk_id': '3167ba0b-2aff-4f69-8dbe-7a8cdc725005',\n",
      "  'chunk_index': 5,\n",
      "  'distance': 0.11749774217605591,\n",
      "  'doc_id': '7a7730fa-96a1-4775-896a-11deac94d668',\n",
      "  'page_number': 1,\n",
      "  'score': 0.8825022578239441,\n",
      "  'source': 'anyscale-rag-application/100-docs/Job_queues.pptx',\n",
      "  'text': '2/12/25, 9:48 AM\\tJob queues | Anyscale Docs Job queues A job queue '\n",
      "          'enables sophisticated scheduling and execution algorithms for '\n",
      "          'Anyscale Jobs. This feature improves resource utilization and '\n",
      "          'reduces provisioning times by enabling multiple jobs to share a '\n",
      "          'single cluster. Anyscale supports flexible scheduling algorithms, '\n",
      "          'including FIFO (first-in, first-out), LIFO (last-in, first-out), '\n",
      "          'and priority-based scheduling. Job processing Anyscale job queues '\n",
      "          'optimize resource utilization and throughput by using sophisticated '\n",
      "          'scheduling to run multiple jobs on the same cluster. Submission: '\n",
      "          'The typical Anyscale Job submission workflow adds the job to the '\n",
      "          'specified queue. Scheduling: Based on the scheduling policy, '\n",
      "          'Anyscale determines ordering of the jobs in the queue and picks '\n",
      "          'jobs at the top of the queue for scheduling. Anyscale schedules no '\n",
      "          'more than the specified max-concurrency jobs for running on a '\n",
      "          'cluster at the same time. Execution: Jobs run until completion, '\n",
      "          'including retries up to the specified number of max_retries . '\n",
      "          'Anyscale provisions a cluster when you submit the first job in a '\n",
      "          'queue, and continues running until there are no more jobs in the '\n",
      "          'queue and it idles. Create a job queue Creating a job queue is '\n",
      "          'similar to creating a standalone Anyscale Job. In your job.yaml '\n",
      "          'file, specify additional job queue configurations: CLI\\tPython SDK '\n",
      "          'Ask AI https://docs.anyscale.com/platform/jobs/job-queues 1/5'},\n",
      " {'chunk_id': '8c158e24-22e2-4984-a402-2331cf0896fc',\n",
      "  'chunk_index': 6,\n",
      "  'distance': 0.12043756246566772,\n",
      "  'doc_id': '82efb8cd-2181-47cb-9389-ff101cc68674',\n",
      "  'page_number': 1,\n",
      "  'score': 0.8795624375343323,\n",
      "  'source': 'anyscale-rag-application/100-docs/Create_and_manage_jobs.pdf',\n",
      "  'text': '2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs Create and '\n",
      "          'manage jobs Submitting a job To submit your job to Anyscale, use '\n",
      "          'the Python SDK or CLI and pass in any additional options or '\n",
      "          'configurations for the job. By default, Anyscale uses your '\n",
      "          'workspace or cloud to provision a cluster to run your job. You can '\n",
      "          'define a custom cluster through a compute config or specify an '\n",
      "          'existing cluster. Once submitted, Anyscale runs the job as '\n",
      "          'specified in the entrypoint command, which is typically a Ray Job. '\n",
      "          \"If the run doesn't succeed, the job restarts using the same \"\n",
      "          'entrypoint up to the number of max_retries . CLI Python SDK '\n",
      "          'anyscale job submit --name=my-job \\\\ --working-dir=. '\n",
      "          '--max-retries=5 \\\\ --image-uri=\"anyscale/image/IMAGE_NAME:VERSION\" '\n",
      "          '\\\\ --compute-config=COMPUTE_CONFIG_NAME \\\\ -- python main.py With '\n",
      "          'the CLI, you can either specify an existing compute config with '\n",
      "          '--compute- config=COMPUTE_CONFIG_NAME or define a new one in a job '\n",
      "          'YAML. For more information on submitting jobs with the CLI, see the '\n",
      "          'reference docs. TIP For large-scale, compute-intensive jobs, avoid '\n",
      "          'scheduling Ray tasks onto the head node because it manages '\n",
      "          'cluster-level orchestration. To do that, set the CPU resource on '\n",
      "          'the head node to 0 in your compute config. Ask AI '\n",
      "          'https://docs.anyscale.com/platform/jobs/manage-jobs 1/5'},\n",
      " {'chunk_id': '2d0623d0-911f-471a-9649-a61c0b2db411',\n",
      "  'chunk_index': 7,\n",
      "  'distance': 0.12314975261688232,\n",
      "  'doc_id': '82efb8cd-2181-47cb-9389-ff101cc68674',\n",
      "  'page_number': 5,\n",
      "  'score': 0.8768502473831177,\n",
      "  'source': 'anyscale-rag-application/100-docs/Create_and_manage_jobs.pdf',\n",
      "  'text': '2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs Using '\n",
      "          'pre-built custom images For frequently used environments, you can '\n",
      "          'build and reuse custom images: 1. Build the image: CLI Python SDK '\n",
      "          'anyscale image build -n my-custom-image --containerfile Dockerfile '\n",
      "          '2. Use the built image in your job submission: CLI Python SDK '\n",
      "          'anyscale job submit --config-file job.yaml --image-uri '\n",
      "          'anyscale/image/my- custom-image:1 This approach is efficient for '\n",
      "          'teams working on multiple jobs that share the same dependencies. '\n",
      "          'https://docs.anyscale.com/platform/jobs/manage-jobs 5/5'},\n",
      " {'chunk_id': '96285d69-2f27-4fb6-81be-31a4ededffba',\n",
      "  'chunk_index': 8,\n",
      "  'distance': 0.1248595118522644,\n",
      "  'doc_id': '82efb8cd-2181-47cb-9389-ff101cc68674',\n",
      "  'page_number': 3,\n",
      "  'score': 0.8751404881477356,\n",
      "  'source': 'anyscale-rag-application/100-docs/Create_and_manage_jobs.pdf',\n",
      "  'text': '2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs anyscale '\n",
      "          \"job terminate --id 'prodjob_...' For more information on \"\n",
      "          'terminating jobs with the CLI, see the reference docs. Archiving a '\n",
      "          'job Archiving jobs hide them from the job list page, but you can '\n",
      "          'still access them through the CLI and SDK. The cluster associated '\n",
      "          'with an archived job is archived automatically. To be archived, '\n",
      "          'jobs must be in a terminal state. You must have created the job or '\n",
      "          'be an organization admin to archive the job. You can archive jobs '\n",
      "          'in Anyscale console or through the CLI/SDK: CLI Python SDK anyscale '\n",
      "          \"job archive --id 'prodjob_...' For more information on archiving \"\n",
      "          'jobs with the CLI, see the reference docs. Managing dependencies '\n",
      "          'When developing Anyscale jobs, you may need to include additional '\n",
      "          'Python packages or system- level dependencies. There are several '\n",
      "          'ways to manage these dependencies: Using a requirements.txt file '\n",
      "          'The simplest way to manage Python package dependencies is by using '\n",
      "          'a requirements.txt file. 1. Create a requirements.txt file in your '\n",
      "          'project directory: emoji==2.12.1 numpy==1.21.0 '\n",
      "          'https://docs.anyscale.com/platform/jobs/manage-jobs 3/5'},\n",
      " {'chunk_id': 'd7c57ead-0228-4c49-bb77-37b67aed463d',\n",
      "  'chunk_index': 9,\n",
      "  'distance': 0.12725389003753662,\n",
      "  'doc_id': '91ddd49b-9e23-4e58-9143-bbac61ee2157',\n",
      "  'page_number': 1,\n",
      "  'score': 0.8727461099624634,\n",
      "  'source': 'anyscale-rag-application/100-docs/Job_schedules.html',\n",
      "  'text': 'config = JobConfig(\\n'\n",
      "          '  name=\"my-job\",\\n'\n",
      "          '  entrypoint=\"python main.py\",\\n'\n",
      "          '  working_dir=\".\",\\n'\n",
      "          '  max_retries=5,\\n'\n",
      "          '  image_uri=\"anyscale/image/IMAGE_NAME:VERSION\",\\n'\n",
      "          '  compute_config=\"COMPUTE_CONFIG_NAME\"\\n'\n",
      "          ')\\n'\n",
      "          '\\n'\n",
      "          'anyscale.job.submit(config) With the SDK, you can either specify an '\n",
      "          'existing compute config or define a new one using the compute '\n",
      "          'config API. For more information on submitting jobs with the SDK, '\n",
      "          'see the reference docs. For a complete list of supported options '\n",
      "          'defining a JobConfig, see the reference docs for JobConfig. tip For '\n",
      "          'large-scale, compute-intensive jobs, avoid scheduling Ray tasks '\n",
      "          'onto the head node because it manages cluster-level orchestration. '\n",
      "          'To do that, set the CPU resource on the head node to 0 in your '\n",
      "          'compute config. Defining a job\\u200b With the CLI, you can define '\n",
      "          'jobs in a YAML file and submit them by referencing the YAML: '\n",
      "          'anyscale job submit --config-file config.yaml For an example of '\n",
      "          'defining a job in a YAML, see the reference docs. Waiting on a '\n",
      "          'job\\u200b You can block CLI and SDK commands until a job enters a '\n",
      "          'specified state. By default, JobState.SUCCEEDED is used. See all '\n",
      "          'available states in the reference docs. CLI Python SDK anyscale job '\n",
      "          'wait -n job-wait When you submit a job, you can specify --wait, '\n",
      "          'which waits for the job to succeed or exits if the job fails. '\n",
      "          'anyscale job submit -n job-wait --wait -- sleep 30 For more '\n",
      "          'information on submitting jobs with the CLI, see the reference '\n",
      "          'docs. import anyscale\\n'\n",
      "          'from anyscale.job.models import JobConfig\\n'\n",
      "          '\\n'\n",
      "          'config = JobConfig(name=\"job-wait\", entrypoint=\"sleep 30\")\\n'\n",
      "          '\\n'\n",
      "          'anyscale.job.submit(config)\\n'\n",
      "          'anyscale.job.wait(name=\"job-wait\") For more information on '\n",
      "          'submitting jobs with the SDK, see the reference docs. Terminating a '\n",
      "          'job\\u200b You can terminate a job from the Job page or using the '\n",
      "          \"CLI/SDK: CLI Python SDK anyscale job terminate --id 'prodjob_...' \"\n",
      "          'For more information on terminating jobs with the CLI, see the '\n",
      "          'reference docs. import anyscale'},\n",
      " {'chunk_id': 'f1e981ec-d129-49a0-8df6-86fc1da99d7f',\n",
      "  'chunk_index': 10,\n",
      "  'distance': 0.1282671093940735,\n",
      "  'doc_id': 'e747a479-126c-42ed-9f20-f84038229e7b',\n",
      "  'page_number': 1,\n",
      "  'score': 0.8717328906059265,\n",
      "  'source': 'anyscale-rag-application/100-docs/Monitor_a_job.docx',\n",
      "  'text': 'to look back. Anyscale stores up to 30 days of logs for your job. '\n",
      "          \"You're able to debug issues even after the job terminates. To \"\n",
      "          'filter the logs, use the search bar to search for specific '\n",
      "          'keywords. Enter a request ID in the search bar to filter logs for a '\n",
      "          'specific request. You can also use contain a specific pattern. '\n",
      "          'Alerts to filter logs if your logs Anyscale jobs have a built-in '\n",
      "          'alert for when a job succeeds or fails. The creator of the job '\n",
      "          'receives an email notification when the job completes. To set up '\n",
      "          'additional alerts based on your own criteria, see Custom dashboards '\n",
      "          'and alerting guide. These alerts are useful for tracking the health '\n",
      "          'of your jobs or job queues. Ray Dashboard The Ray Dashboard is '\n",
      "          'scoped to a single Ray cluster. Each job attempt launches a new Ray '\n",
      "          'cluster unless Job queues are used. To access this dashboard, click '\n",
      "          'the \"Ray Dashboard\" tab in the job detail page. To learn more about '\n",
      "          'how to use the Ray Dashboard, see the Ray documentation. Exporting '\n",
      "          'logs and metrics If you want to push logs to Vector, a tool to ship '\n",
      "          'logs to Amazon CloudWatch, Google Cloud Monitoring, Datadog, or '\n",
      "          'other observability tools, see Exporting logs and metrics with '\n",
      "          'Vector. More info To learn more details about the Ray Dashboard, '\n",
      "          'see the Ray Dashboard documentation To learn more about Grafana and '\n",
      "          'how to use it, see the official Grafana documentation To learn more '\n",
      "          'about the metrics that Ray emits, see the System Metrics '\n",
      "          'documentation'}]\n",
      "Rendered Prompt:\n",
      "('Use the following pieces of context to answer the question at the end.\\n'\n",
      " \"If you don't know the answer, just say that you don't know, don't try to \"\n",
      " 'make up an answer.\\n'\n",
      " 'Use three sentences maximum and keep the answer as concise as possible.\\n'\n",
      " 'Always say \"thanks for asking!\" at the end of the answer.\\n'\n",
      " '\\n'\n",
      " \"[{'chunk_index': 1, 'chunk_id': '55b8db89-0aa6-460c-ae3a-284241f5865c', \"\n",
      " \"'doc_id': '7b170e3d-4081-4527-a737-1da022a364c4', 'page_number': 1, \"\n",
      " \"'source': 'anyscale-rag-application/100-docs/Jobs.txt', 'text': \"\n",
      " '\"2/12/25, 9:48 AM Jobs | Anyscale Docs Jobs Run discrete workloads in '\n",
      " 'production such as batch inference, bulk embeddings generation, or model '\n",
      " 'fine-tuning. Anyscale Jobs allow you to submit applications developed on '\n",
      " 'workspaces to a standalone Ray cluster for execution. Built for production '\n",
      " 'and designed to fit into your CI/CD pipeline, jobs ensure scalable and '\n",
      " 'reliable performance. How does it work? # When you’re ready to promote an '\n",
      " 'app to production, submit a job from the workspace using anyscale job submit '\n",
      " '. Anyscale Jobs have the following features: Scalability: Rapid scaling to '\n",
      " 'thousands of cloud instances, adjusting computing resources to match '\n",
      " 'application demand. Fault tolerance: Retries for failures and automatic '\n",
      " 'rescheduling to an alternative cluster for unexpected failures like running '\n",
      " 'out of memory. Monitoring and observability: Persistent dashboards that '\n",
      " 'allow you to observe tasks in real time and email alerts upon successf ul '\n",
      " 'job completion. Get started 1. Sign in or sign up for an account. 2. Select '\n",
      " 'the Intro to Jobs example. 3. Select Launch. This example runs in a '\n",
      " 'Workspace. See Workspaces for background information. 4. Follow the notebook '\n",
      " \"or view it in the docs. 5. Terminate the Workspace when you're done. Ask AI \"\n",
      " 'https://docs.anyscale.com/platform/jobs/ 1/2 2/12/25, 9:48 AM Jobs | '\n",
      " 'Anyscale Docs https://docs.anyscale.com/platform/jobs/ 2/2\", \\'distance\\': '\n",
      " \"0.10053980350494385, 'score': 0.8994601964950562}, {'chunk_index': 2, \"\n",
      " \"'chunk_id': 'fdd75966-04d9-43d2-8e3e-b85e8387d086', 'doc_id': \"\n",
      " \"'82efb8cd-2181-47cb-9389-ff101cc68674', 'page_number': 2, 'source': \"\n",
      " \"'anyscale-rag-application/100-docs/Create_and_manage_jobs.pdf', 'text': \"\n",
      " \"'2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs Defining a job With \"\n",
      " 'the CLI, you can define jobs in a YAML file and submit them by referencing '\n",
      " 'the YAML: anyscale job submit --config-file config.yaml For an example of '\n",
      " 'defining a job in a YAML, see the reference docs. Waiting on a job You can '\n",
      " 'block CLI and SDK commands until a job enters a specified state. By default, '\n",
      " 'JobState.SUCCEEDED is used. See all available states in the reference docs. '\n",
      " 'CLI Python SDK anyscale job wait -n job-wait When you submit a job, you can '\n",
      " 'specify --wait , which waits for the job to succeed or exits if the job '\n",
      " 'fails. anyscale job submit -n job-wait --wait -- sleep 30 For more '\n",
      " 'information on submitting jobs with the CLI, see the reference docs. '\n",
      " 'Terminating a job You can terminate a job from the Job page or using the '\n",
      " 'CLI/SDK: CLI Python SDK https://docs.anyscale.com/platform/jobs/manage-jobs '\n",
      " \"2/5', 'distance': 0.11302393674850464, 'score': 0.8869760632514954}, \"\n",
      " \"{'chunk_index': 3, 'chunk_id': '23bf61db-155a-4a1a-a74f-669409c92303', \"\n",
      " \"'doc_id': '91ddd49b-9e23-4e58-9143-bbac61ee2157', 'page_number': 1, \"\n",
      " \"'source': 'anyscale-rag-application/100-docs/Job_schedules.html', 'text': \"\n",
      " '\\'anyscale.job.terminate(name=\"my-job\") For more information on terminating '\n",
      " 'jobs with the SDK, see the reference docs. Archiving a job\\\\u200b Archiving '\n",
      " 'jobs hide them from the job list page, but you can still access them through '\n",
      " 'the CLI and SDK. The cluster associated with an archived job is archived '\n",
      " 'automatically. To be archived, jobs must be in a terminal state. You must '\n",
      " 'have created the job or be an organization admin to archive the job. You can '\n",
      " 'archive jobs in Anyscale console or through the CLI/SDK: CLI Python SDK '\n",
      " \"anyscale job archive --id \\\\'prodjob_...\\\\' For more information on \"\n",
      " 'archiving jobs with the CLI, see the reference docs. import '\n",
      " 'anyscale\\\\n\\\\nanyscale.job.archive(name=\"my-job\") For more information on '\n",
      " 'archiving jobs with the SDK, see the reference docs. Managing '\n",
      " 'dependencies\\\\u200b When developing Anyscale jobs, you may need to include '\n",
      " 'additional Python packages or system-level dependencies. There are several '\n",
      " 'ways to manage these dependencies: Using a requirements.txt file\\\\u200b The '\n",
      " 'simplest way to manage Python package dependencies is by using a '\n",
      " 'requirements.txt file. Create a requirements.txt file in your project '\n",
      " 'directory: emoji==2.12.1\\\\nnumpy==1.21.0 When submitting your job, include '\n",
      " 'the -r or --requirements flag: CLI Python SDK anyscale job submit '\n",
      " '--config-file job.yaml -r ./requirements.txt import anyscale\\\\nfrom '\n",
      " 'anyscale.job.models import JobConfig\\\\n\\\\nconfig = JobConfig(\\\\n    '\n",
      " 'name=\"my-job\",\\\\n    entrypoint=\"python main.py\",\\\\n    '\n",
      " 'working_dir=\".\",\\\\n    '\n",
      " 'requirements=\"./requirements.txt\"\\\\n)\\\\n\\\\nanyscale.job.submit(config) This '\n",
      " 'method works well for straightforward Python package dependencies. Anyscale '\n",
      " \"installs these packages in the job\\\\'s environment before running your code. \"\n",
      " 'Using a custom container\\\\u200b For more complex dependency management, '\n",
      " 'including system-level packages or specific environment configurations, use '\n",
      " 'a custom container: Create a Dockerfile: FROM '\n",
      " 'anyscale/ray:2.10.0-py310\\\\n\\\\n# Install system dependencies if needed\\\\nRUN '\n",
      " \"apt-get update && apt-get install -y <your-system-packages>', 'distance': \"\n",
      " \"0.11444741487503052, 'score': 0.8855525851249695}, {'chunk_index': 4, \"\n",
      " \"'chunk_id': '13683649-bcdc-444d-866f-36cafcb132c9', 'doc_id': \"\n",
      " \"'91ddd49b-9e23-4e58-9143-bbac61ee2157', 'page_number': 1, 'source': \"\n",
      " \"'anyscale-rag-application/100-docs/Job_schedules.html', 'text': 'Create and \"\n",
      " 'manage jobs Submitting a job\\\\u200b To submit your job to Anyscale, use the '\n",
      " 'Python SDK or CLI and pass in any additional options or configurations for '\n",
      " 'the job. By default, Anyscale uses your workspace or cloud to provision a '\n",
      " 'cluster to run your job. You can define a custom cluster through a compute '\n",
      " 'config or specify an existing cluster. Once submitted, Anyscale runs the job '\n",
      " 'as specified in the entrypoint command, which is typically a Ray Job. If the '\n",
      " \"run doesn\\\\'t succeed, the job restarts using the same entrypoint up to the \"\n",
      " 'number of max_retries. CLI Python SDK anyscale job submit --name=my-job '\n",
      " '\\\\\\\\\\\\n  --working-dir=. --max-retries=5 \\\\\\\\\\\\n  '\n",
      " '--image-uri=\"anyscale/image/IMAGE_NAME:VERSION\" \\\\\\\\\\\\n  '\n",
      " '--compute-config=COMPUTE_CONFIG_NAME \\\\\\\\\\\\n  -- python main.py With the '\n",
      " 'CLI, you can either specify an existing compute config with '\n",
      " '--compute-config=COMPUTE_CONFIG_NAME or define a new one in a job YAML. For '\n",
      " 'more information on submitting jobs with the CLI, see the reference docs. '\n",
      " 'import anyscale\\\\nfrom anyscale.job.models import JobConfig\\\\n\\\\nconfig = '\n",
      " 'JobConfig(\\\\n  name=\"my-job\",\\\\n  entrypoint=\"python main.py\",\\\\n  '\n",
      " 'working_dir=\".\",\\\\n  max_retries=5,\\\\n  '\n",
      " 'image_uri=\"anyscale/image/IMAGE_NAME:VERSION\",\\\\n  '\n",
      " 'compute_config=\"COMPUTE_CONFIG_NAME\"\\\\n)\\', \\'distance\\': '\n",
      " \"0.11574643850326538, 'score': 0.8842535614967346}, {'chunk_index': 5, \"\n",
      " \"'chunk_id': '3167ba0b-2aff-4f69-8dbe-7a8cdc725005', 'doc_id': \"\n",
      " \"'7a7730fa-96a1-4775-896a-11deac94d668', 'page_number': 1, 'source': \"\n",
      " \"'anyscale-rag-application/100-docs/Job_queues.pptx', 'text': '2/12/25, 9:48 \"\n",
      " 'AM\\\\tJob queues | Anyscale Docs Job queues A job queue enables sophisticated '\n",
      " 'scheduling and execution algorithms for Anyscale Jobs. This feature improves '\n",
      " 'resource utilization and reduces provisioning times by enabling multiple '\n",
      " 'jobs to share a single cluster. Anyscale supports flexible scheduling '\n",
      " 'algorithms, including FIFO (first-in, first-out), LIFO (last-in, first-out), '\n",
      " 'and priority-based scheduling. Job processing Anyscale job queues optimize '\n",
      " 'resource utilization and throughput by using sophisticated scheduling to run '\n",
      " 'multiple jobs on the same cluster. Submission: The typical Anyscale Job '\n",
      " 'submission workflow adds the job to the specified queue. Scheduling: Based '\n",
      " 'on the scheduling policy, Anyscale determines ordering of the jobs in the '\n",
      " 'queue and picks jobs at the top of the queue for scheduling. Anyscale '\n",
      " 'schedules no more than the specified max-concurrency jobs for running on a '\n",
      " 'cluster at the same time. Execution: Jobs run until completion, including '\n",
      " 'retries up to the specified number of max_retries . Anyscale provisions a '\n",
      " 'cluster when you submit the first job in a queue, and continues running '\n",
      " 'until there are no more jobs in the queue and it idles. Create a job queue '\n",
      " 'Creating a job queue is similar to creating a standalone Anyscale Job. In '\n",
      " 'your job.yaml file, specify additional job queue configurations: '\n",
      " 'CLI\\\\tPython SDK Ask AI https://docs.anyscale.com/platform/jobs/job-queues '\n",
      " \"1/5', 'distance': 0.11749774217605591, 'score': 0.8825022578239441}, \"\n",
      " \"{'chunk_index': 6, 'chunk_id': '8c158e24-22e2-4984-a402-2331cf0896fc', \"\n",
      " \"'doc_id': '82efb8cd-2181-47cb-9389-ff101cc68674', 'page_number': 1, \"\n",
      " \"'source': 'anyscale-rag-application/100-docs/Create_and_manage_jobs.pdf', \"\n",
      " \"'text': '2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs Create and \"\n",
      " 'manage jobs Submitting a job To submit your job to Anyscale, use the Python '\n",
      " 'SDK or CLI and pass in any additional options or configurations for the job. '\n",
      " 'By default, Anyscale uses your workspace or cloud to provision a cluster to '\n",
      " 'run your job. You can define a custom cluster through a compute config or '\n",
      " 'specify an existing cluster. Once submitted, Anyscale runs the job as '\n",
      " 'specified in the entrypoint command, which is typically a Ray Job. If the '\n",
      " \"run doesn\\\\'t succeed, the job restarts using the same entrypoint up to the \"\n",
      " 'number of max_retries . CLI Python SDK anyscale job submit --name=my-job '\n",
      " '\\\\\\\\ --working-dir=. --max-retries=5 \\\\\\\\ '\n",
      " '--image-uri=\"anyscale/image/IMAGE_NAME:VERSION\" \\\\\\\\ '\n",
      " '--compute-config=COMPUTE_CONFIG_NAME \\\\\\\\ -- python main.py With the CLI, '\n",
      " 'you can either specify an existing compute config with --compute- '\n",
      " 'config=COMPUTE_CONFIG_NAME or define a new one in a job YAML. For more '\n",
      " 'information on submitting jobs with the CLI, see the reference docs. TIP For '\n",
      " 'large-scale, compute-intensive jobs, avoid scheduling Ray tasks onto the '\n",
      " 'head node because it manages cluster-level orchestration. To do that, set '\n",
      " 'the CPU resource on the head node to 0 in your compute config. Ask AI '\n",
      " \"https://docs.anyscale.com/platform/jobs/manage-jobs 1/5', 'distance': \"\n",
      " \"0.12043756246566772, 'score': 0.8795624375343323}, {'chunk_index': 7, \"\n",
      " \"'chunk_id': '2d0623d0-911f-471a-9649-a61c0b2db411', 'doc_id': \"\n",
      " \"'82efb8cd-2181-47cb-9389-ff101cc68674', 'page_number': 5, 'source': \"\n",
      " \"'anyscale-rag-application/100-docs/Create_and_manage_jobs.pdf', 'text': \"\n",
      " \"'2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs Using pre-built \"\n",
      " 'custom images For frequently used environments, you can build and reuse '\n",
      " 'custom images: 1. Build the image: CLI Python SDK anyscale image build -n '\n",
      " 'my-custom-image --containerfile Dockerfile 2. Use the built image in your '\n",
      " 'job submission: CLI Python SDK anyscale job submit --config-file job.yaml '\n",
      " '--image-uri anyscale/image/my- custom-image:1 This approach is efficient for '\n",
      " 'teams working on multiple jobs that share the same dependencies. '\n",
      " \"https://docs.anyscale.com/platform/jobs/manage-jobs 5/5', 'distance': \"\n",
      " \"0.12314975261688232, 'score': 0.8768502473831177}, {'chunk_index': 8, \"\n",
      " \"'chunk_id': '96285d69-2f27-4fb6-81be-31a4ededffba', 'doc_id': \"\n",
      " \"'82efb8cd-2181-47cb-9389-ff101cc68674', 'page_number': 3, 'source': \"\n",
      " \"'anyscale-rag-application/100-docs/Create_and_manage_jobs.pdf', 'text': \"\n",
      " '\"2/12/25, 9:48 AM Create and manage jobs | Anyscale Docs anyscale job '\n",
      " \"terminate --id 'prodjob_...' For more information on terminating jobs with \"\n",
      " 'the CLI, see the reference docs. Archiving a job Archiving jobs hide them '\n",
      " 'from the job list page, but you can still access them through the CLI and '\n",
      " 'SDK. The cluster associated with an archived job is archived automatically. '\n",
      " 'To be archived, jobs must be in a terminal state. You must have created the '\n",
      " 'job or be an organization admin to archive the job. You can archive jobs in '\n",
      " 'Anyscale console or through the CLI/SDK: CLI Python SDK anyscale job archive '\n",
      " \"--id 'prodjob_...' For more information on archiving jobs with the CLI, see \"\n",
      " 'the reference docs. Managing dependencies When developing Anyscale jobs, you '\n",
      " 'may need to include additional Python packages or system- level '\n",
      " 'dependencies. There are several ways to manage these dependencies: Using a '\n",
      " 'requirements.txt file The simplest way to manage Python package dependencies '\n",
      " 'is by using a requirements.txt file. 1. Create a requirements.txt file in '\n",
      " 'your project directory: emoji==2.12.1 numpy==1.21.0 '\n",
      " 'https://docs.anyscale.com/platform/jobs/manage-jobs 3/5\", \\'distance\\': '\n",
      " \"0.1248595118522644, 'score': 0.8751404881477356}, {'chunk_index': 9, \"\n",
      " \"'chunk_id': 'd7c57ead-0228-4c49-bb77-37b67aed463d', 'doc_id': \"\n",
      " \"'91ddd49b-9e23-4e58-9143-bbac61ee2157', 'page_number': 1, 'source': \"\n",
      " \"'anyscale-rag-application/100-docs/Job_schedules.html', 'text': 'config = \"\n",
      " 'JobConfig(\\\\n  name=\"my-job\",\\\\n  entrypoint=\"python main.py\",\\\\n  '\n",
      " 'working_dir=\".\",\\\\n  max_retries=5,\\\\n  '\n",
      " 'image_uri=\"anyscale/image/IMAGE_NAME:VERSION\",\\\\n  '\n",
      " 'compute_config=\"COMPUTE_CONFIG_NAME\"\\\\n)\\\\n\\\\nanyscale.job.submit(config) '\n",
      " 'With the SDK, you can either specify an existing compute config or define a '\n",
      " 'new one using the compute config API. For more information on submitting '\n",
      " 'jobs with the SDK, see the reference docs. For a complete list of supported '\n",
      " 'options defining a JobConfig, see the reference docs for JobConfig. tip For '\n",
      " 'large-scale, compute-intensive jobs, avoid scheduling Ray tasks onto the '\n",
      " 'head node because it manages cluster-level orchestration. To do that, set '\n",
      " 'the CPU resource on the head node to 0 in your compute config. Defining a '\n",
      " 'job\\\\u200b With the CLI, you can define jobs in a YAML file and submit them '\n",
      " 'by referencing the YAML: anyscale job submit --config-file config.yaml For '\n",
      " 'an example of defining a job in a YAML, see the reference docs. Waiting on a '\n",
      " 'job\\\\u200b You can block CLI and SDK commands until a job enters a specified '\n",
      " 'state. By default, JobState.SUCCEEDED is used. See all available states in '\n",
      " 'the reference docs. CLI Python SDK anyscale job wait -n job-wait When you '\n",
      " 'submit a job, you can specify --wait, which waits for the job to succeed or '\n",
      " 'exits if the job fails. anyscale job submit -n job-wait --wait -- sleep 30 '\n",
      " 'For more information on submitting jobs with the CLI, see the reference '\n",
      " 'docs. import anyscale\\\\nfrom anyscale.job.models import '\n",
      " 'JobConfig\\\\n\\\\nconfig = JobConfig(name=\"job-wait\", entrypoint=\"sleep '\n",
      " '30\")\\\\n\\\\nanyscale.job.submit(config)\\\\nanyscale.job.wait(name=\"job-wait\") '\n",
      " 'For more information on submitting jobs with the SDK, see the reference '\n",
      " 'docs. Terminating a job\\\\u200b You can terminate a job from the Job page or '\n",
      " 'using the CLI/SDK: CLI Python SDK anyscale job terminate --id '\n",
      " \"\\\\'prodjob_...\\\\' For more information on terminating jobs with the CLI, see \"\n",
      " \"the reference docs. import anyscale', 'distance': 0.12725389003753662, \"\n",
      " \"'score': 0.8727461099624634}, {'chunk_index': 10, 'chunk_id': \"\n",
      " \"'f1e981ec-d129-49a0-8df6-86fc1da99d7f', 'doc_id': \"\n",
      " \"'e747a479-126c-42ed-9f20-f84038229e7b', 'page_number': 1, 'source': \"\n",
      " \"'anyscale-rag-application/100-docs/Monitor_a_job.docx', 'text': 'to look \"\n",
      " \"back. Anyscale stores up to 30 days of logs for your job. You\\\\'re able to \"\n",
      " 'debug issues even after the job terminates. To filter the logs, use the '\n",
      " 'search bar to search for specific keywords. Enter a request ID in the search '\n",
      " 'bar to filter logs for a specific request. You can also use contain a '\n",
      " 'specific pattern. Alerts to filter logs if your logs Anyscale jobs have a '\n",
      " 'built-in alert for when a job succeeds or fails. The creator of the job '\n",
      " 'receives an email notification when the job completes. To set up additional '\n",
      " 'alerts based on your own criteria, see Custom dashboards and alerting guide. '\n",
      " 'These alerts are useful for tracking the health of your jobs or job queues. '\n",
      " 'Ray Dashboard The Ray Dashboard is scoped to a single Ray cluster. Each job '\n",
      " 'attempt launches a new Ray cluster unless Job queues are used. To access '\n",
      " 'this dashboard, click the \"Ray Dashboard\" tab in the job detail page. To '\n",
      " 'learn more about how to use the Ray Dashboard, see the Ray documentation. '\n",
      " 'Exporting logs and metrics If you want to push logs to Vector, a tool to '\n",
      " 'ship logs to Amazon CloudWatch, Google Cloud Monitoring, Datadog, or other '\n",
      " 'observability tools, see Exporting logs and metrics with Vector. More info '\n",
      " 'To learn more details about the Ray Dashboard, see the Ray Dashboard '\n",
      " 'documentation To learn more about Grafana and how to use it, see the '\n",
      " 'official Grafana documentation To learn more about the metrics that Ray '\n",
      " \"emits, see the System Metrics documentation', 'distance': \"\n",
      " \"0.1282671093940735, 'score': 0.8717328906059265}]\\n\"\n",
      " '\\n'\n",
      " 'Question: what is a anyscale job\\n'\n",
      " '\\n'\n",
      " 'Helpful Answer:')\n"
     ]
    }
   ],
   "source": [
    "\n",
    "EMBEDDER_MODEL_NAME = \"intfloat/multilingual-e5-large-instruct\"\n",
    "CHROMA_PATH = \"/mnt/cluster_storage/vector_store\"\n",
    "CHROMA_COLLECTION_NAME = \"anyscale_jobs_docs_embeddings\"\n",
    "\n",
    "\n",
    "# Initialize the querier.\n",
    "querier = ChromaQuerier(CHROMA_PATH, CHROMA_COLLECTION_NAME, score_threshold=0.8)\n",
    "embedder = Embedder(EMBEDDER_MODEL_NAME)\n",
    "\n",
    "# Perform the user request.\n",
    "user_request = \"what is a anyscale job\"\n",
    "embedding = embedder.embed_single(user_request)\n",
    "formatted_results = querier.query(embedding, n_results=10)\n",
    "\n",
    "# Print the formatted query results.\n",
    "print(\"Query Results:\")\n",
    "pprint(formatted_results)\n",
    "\n",
    "# Render the prompt. \n",
    "prompt = render_basic_rag_prompt(user_request, context=formatted_results)\n",
    "print(\"Rendered Prompt:\")\n",
    "pprint(prompt) "
   ]
  },
  {
   "cell_type": "markdown",
   "id": "14344f15",
   "metadata": {},
   "source": [
    "\n",
    "## Get the Final Response\n",
    "Stream the response from the LLM using the generated prompt.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fe774304",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Anyscale Jobs are used to run discrete workloads in production, such as batch inference or model fine-tuning, on a standalone Ray cluster for scalable and reliable performance. Thanks for asking!"
     ]
    }
   ],
   "source": [
    "for token in client.get_response_streaming(prompt, temperature=0.5):\n",
    "    print(token, end=\"\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "6824d63f",
   "metadata": {},
   "source": [
    "## Observations and Next Steps\n",
    "\n",
    "As you can see, the LLM now understands that Anysacle jobs refers to the platform rather than job openings at Anysacale. This demonstrates the power of using RAG. However, the response from this basic RAG implementation is not optimal: it’s too short and lacks citations, leaving us unclear about the sources of the information. In future tutorials, we will assess these issues and propose several methods to address them through prompt engineering."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.9"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
